Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
50.00% covered (danger)
50.00%
8 / 16
CRAP
53.77% covered (warning)
53.77%
57 / 106
Job
0.00% covered (danger)
0.00%
0 / 1
50.00% covered (danger)
50.00%
8 / 16
164.02
53.77% covered (warning)
53.77%
57 / 106
 __construct
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
6 / 6
 getName
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
1 / 1
 getSteps
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
1 / 1
 getStep
0.00% covered (danger)
0.00%
0 / 1
12.00
0.00% covered (danger)
0.00%
0 / 4
 getStepNames
0.00% covered (danger)
0.00%
0 / 1
6.00
0.00% covered (danger)
0.00%
0 / 4
 getJobRepository
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
1 / 1
 __toString
0.00% covered (danger)
0.00%
0 / 1
2.00
0.00% covered (danger)
0.00%
0 / 1
 execute
0.00% covered (danger)
0.00%
0 / 1
15.65
43.90% covered (danger)
43.90%
18 / 41
 doExecute
0.00% covered (danger)
0.00%
0 / 1
4.01
91.67% covered (success)
91.67%
11 / 12
 handleStep
0.00% covered (danger)
0.00%
0 / 1
9.66
42.86% covered (danger)
42.86%
6 / 14
 dispatchJobExecutionEvent
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
3 / 3
 dispatch
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
2 / 2
 getDefaultExitStatusForFailure
0.00% covered (danger)
0.00%
0 / 1
12.00
0.00% covered (danger)
0.00%
0 / 6
 updateStatus
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
2 / 2
 createWorkingDirectory
0.00% covered (danger)
0.00%
0 / 1
2.26
60.00% covered (warning)
60.00%
3 / 5
 deleteWorkingDirectory
100.00% covered (success)
100.00%
1 / 1
2
100.00% covered (success)
100.00%
3 / 3
<?php
namespace Akeneo\Tool\Component\Batch\Job;
use Akeneo\Tool\Component\Batch\Event\EventInterface;
use Akeneo\Tool\Component\Batch\Event\JobExecutionEvent;
use Akeneo\Tool\Component\Batch\Model\JobExecution;
use Akeneo\Tool\Component\Batch\Model\StepExecution;
use Akeneo\Tool\Component\Batch\Step\StepInterface;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Filesystem\Exception\IOException;
use Symfony\Component\Filesystem\Filesystem;
/**
 * Implementation of the {@link Job} interface.
 *
 * Inspired by Spring Batch org.springframework.batch.core.job.AbstractJob;
 *
 * @author    Benoit Jacquemont <benoit@akeneo.com>
 * @copyright 2013 Akeneo SAS (http://www.akeneo.com)
 * @license   http://opensource.org/licenses/MIT MIT
 */
class Job implements JobInterface
{
    /** @var string */
    protected $name;
    /* @var EventDispatcherInterface */
    protected $eventDispatcher;
    /* @var JobRepositoryInterface */
    protected $jobRepository;
    /** @var array */
    protected $steps;
    /** @var Filesystem */
    protected $filesystem;
    /**
     * @param string                   $name
     * @param EventDispatcherInterface $eventDispatcher
     * @param JobRepositoryInterface   $jobRepository
     * @param StepInterface[]          $steps
     */
    public function __construct(
        $name,
        EventDispatcherInterface $eventDispatcher,
        JobRepositoryInterface $jobRepository,
        array $steps = []
    ) {
        $this->name = $name;
        $this->eventDispatcher = $eventDispatcher;
        $this->jobRepository = $jobRepository;
        $this->steps = $steps;
        $this->filesystem = new Filesystem();
    }
    /**
     * Get the job's name
     *
     * @return string
     */
    public function getName()
    {
        return $this->name;
    }
    /**
     * Return all the steps
     *
     * @return array steps
     */
    public function getSteps()
    {
        return $this->steps;
    }
    /**
     * Retrieve the step with the given name. If there is no Step with the given
     * name, then return null.
     *
     * @param string $stepName
     *
     * @return StepInterface the Step
     */
    public function getStep($stepName)
    {
        foreach ($this->steps as $step) {
            if ($step->getName() == $stepName) {
                return $step;
            }
        }
        return null;
    }
    /**
     * Retrieve the step names.
     *
     * @return array the step names
     */
    public function getStepNames()
    {
        $names = [];
        foreach ($this->steps as $step) {
            $names[] = $step->getName();
        }
        return $names;
    }
    /**
     * Public getter for the {@link JobRepositoryInterface} that is needed to manage the
     * state of the batch meta domain (jobs, steps, executions) during the life
     * of a job.
     *
     * @return JobRepositoryInterface
     */
    public function getJobRepository()
    {
        return $this->jobRepository;
    }
    /**
     * To string
     *
     * @return string
     */
    public function __toString()
    {
        return get_class($this) . ': [name=' . $this->name . ']';
    }
    /**
     * Run the specified job, handling all listener and repository calls, and
     * delegating the actual processing to {@link #doExecute(JobExecution)}.
     * @param JobExecution $jobExecution
     *
     * @see Job#execute(JobExecution)
     *
     * A unique working directory is created before the execution of the job. It is deleted when the job is terminated.
     * The working directory is created in the temporary filesystem. Its pathname is placed in the JobExecutionContext
     * via the key {@link \Akeneo\Tool\Component\Batch\Job\JobInterface::WORKING_DIRECTORY_PARAMETER}
     */
    final public function execute(JobExecution $jobExecution)
    {
        try {
            $workingDirectory = $this->createWorkingDirectory();
            $jobExecution->getExecutionContext()->put(JobInterface::WORKING_DIRECTORY_PARAMETER, $workingDirectory);
            $this->dispatchJobExecutionEvent(EventInterface::BEFORE_JOB_EXECUTION, $jobExecution);
            if ($jobExecution->getStatus()->getValue() !== BatchStatus::STOPPING) {
                $jobExecution->setStartTime(new \DateTime());
                $this->updateStatus($jobExecution, BatchStatus::STARTED);
                $this->jobRepository->updateJobExecution($jobExecution);
                $this->doExecute($jobExecution);
            } else {
                // The job was already stopped before we even got this far. Deal
                // with it in the same way as any other interruption.
                $jobExecution->setStatus(new BatchStatus(BatchStatus::STOPPED));
                $jobExecution->setExitStatus(new ExitStatus(ExitStatus::COMPLETED));
                $this->jobRepository->updateJobExecution($jobExecution);
                $this->dispatchJobExecutionEvent(EventInterface::JOB_EXECUTION_STOPPED, $jobExecution);
            }
            if (($jobExecution->getStatus()->getValue() <= BatchStatus::STOPPED)
                && (count($jobExecution->getStepExecutions()) === 0)
            ) {
                $exitStatus = $jobExecution->getExitStatus();
                $noopExitStatus = new ExitStatus(ExitStatus::NOOP);
                $noopExitStatus->addExitDescription("All steps already completed or no steps configured for this job.");
                $jobExecution->setExitStatus($exitStatus->logicalAnd($noopExitStatus));
                $this->jobRepository->updateJobExecution($jobExecution);
            }
            $this->dispatchJobExecutionEvent(EventInterface::AFTER_JOB_EXECUTION, $jobExecution);
            $jobExecution->setEndTime(new \DateTime());
            $this->jobRepository->updateJobExecution($jobExecution);
        } catch (JobInterruptedException $e) {
            $jobExecution->setExitStatus($this->getDefaultExitStatusForFailure($e));
            $jobExecution->setStatus(
                new BatchStatus(
                    BatchStatus::max(BatchStatus::STOPPED, $e->getStatus()->getValue())
                )
            );
            $jobExecution->addFailureException($e);
            $this->jobRepository->updateJobExecution($jobExecution);
            $this->dispatchJobExecutionEvent(EventInterface::JOB_EXECUTION_INTERRUPTED, $jobExecution);
        } catch (\Exception $e) {
            $jobExecution->setExitStatus($this->getDefaultExitStatusForFailure($e));
            $jobExecution->setStatus(new BatchStatus(BatchStatus::FAILED));
            $jobExecution->addFailureException($e);
            $this->jobRepository->updateJobExecution($jobExecution);
            $this->dispatchJobExecutionEvent(EventInterface::JOB_EXECUTION_FATAL_ERROR, $jobExecution);
        } finally {
            $workingDirectory = $jobExecution->getExecutionContext()->get(JobInterface::WORKING_DIRECTORY_PARAMETER);
            if (null !== $workingDirectory) {
                $this->deleteWorkingDirectory($workingDirectory);
            }
        }
    }
    /**
     * Handler of steps sequentially as provided, checking each one for success
     * before moving to the next. Returns the last {@link StepExecution}
     * successfully processed if it exists, and null if none were processed.
     *
     * @param JobExecution $jobExecution the current {@link JobExecution}
     *
     * @throws JobInterruptedException
     */
    protected function doExecute(JobExecution $jobExecution)
    {
        /* @var StepExecution $stepExecution */
        $stepExecution = null;
        foreach ($this->steps as $step) {
            $stepExecution = $this->handleStep($step, $jobExecution);
            $this->jobRepository->updateStepExecution($stepExecution);
            if ($stepExecution->getStatus()->getValue() !== BatchStatus::COMPLETED) {
                // Terminate the job if a step fails
                break;
            }
        }
        // Update the job status to be the same as the last step
        if ($stepExecution !== null) {
            $this->dispatchJobExecutionEvent(EventInterface::BEFORE_JOB_STATUS_UPGRADE, $jobExecution);
            $jobExecution->upgradeStatus($stepExecution->getStatus()->getValue());
            $jobExecution->setExitStatus($stepExecution->getExitStatus());
            $this->jobRepository->updateJobExecution($jobExecution);
        }
    }
    /**
     * Handle a step and return the execution for it.
     * @param StepInterface $step         Step
     * @param JobExecution  $jobExecution Job execution
     *
     * @throws JobInterruptedException
     *
     * @return StepExecution
     */
    protected function handleStep(StepInterface $step, JobExecution $jobExecution)
    {
        if ($jobExecution->isStopping()) {
            throw new JobInterruptedException("JobExecution interrupted.");
        }
        $stepExecution = $jobExecution->createStepExecution($step->getName());
        try {
            $step->execute($stepExecution);
        } catch (JobInterruptedException $e) {
            $stepExecution->setStatus(new BatchStatus(BatchStatus::STOPPING));
            $this->jobRepository->updateStepExecution($stepExecution);
            throw $e;
        }
        if ($stepExecution->getStatus()->getValue() == BatchStatus::STOPPING
            || $stepExecution->getStatus()->getValue() == BatchStatus::STOPPED) {
            $jobExecution->setStatus(new BatchStatus(BatchStatus::STOPPING));
            $this->jobRepository->updateJobExecution($jobExecution);
            throw new JobInterruptedException("Job interrupted by step execution");
        }
        return $stepExecution;
    }
    /**
     * Trigger event linked to JobExecution
     *
     * @param string       $eventName    Name of the event
     * @param JobExecution $jobExecution Object to store job execution
     */
    private function dispatchJobExecutionEvent($eventName, JobExecution $jobExecution)
    {
        $event = new JobExecutionEvent($jobExecution);
        $this->dispatch($eventName, $event);
    }
    /**
     * Generic batch event dispatcher
     *
     * @param string $eventName Name of the event
     * @param Event  $event     Event object
     */
    private function dispatch($eventName, Event $event)
    {
        $this->eventDispatcher->dispatch($eventName, $event);
    }
    /**
     * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a
     * {@link StepExecutionListener}.
     *
     * @param \Exception $e the cause of the failure
     *
     * @return ExitStatus an {@link ExitStatus}
     */
    private function getDefaultExitStatusForFailure(\Exception $e)
    {
        if ($e instanceof JobInterruptedException || $e->getPrevious() instanceof JobInterruptedException) {
            $exitStatus = new ExitStatus(ExitStatus::STOPPED);
            $exitStatus->addExitDescription(get_class(new JobInterruptedException()));
        } else {
            $exitStatus = new ExitStatus(ExitStatus::FAILED);
            $exitStatus->addExitDescription($e);
        }
        return $exitStatus;
    }
    /**
     * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a
     * {@link StepExecutionListener}.
     *
     * @param JobExecution $jobExecution Execution of the job
     * @param string       $status       Status of the execution
     *
     * @return an {@link ExitStatus}
     */
    private function updateStatus(JobExecution $jobExecution, $status)
    {
        $jobExecution->setStatus(new BatchStatus($status));
    }
    /**
     * Create a unique working directory
     *
     * @return string the working directory path
     */
    private function createWorkingDirectory()
    {
        $path = sys_get_temp_dir() . DIRECTORY_SEPARATOR . uniqid('akeneo_batch_') . DIRECTORY_SEPARATOR;
        try {
            $this->filesystem->mkdir($path);
        } catch (IOException $e) {
            // this exception will be catched by {Job->execute()} and will set the batch as failed
            throw new RuntimeErrorException('Failed to write to file %path%', ['%path%' => $path]);
        }
        return $path;
    }
    /**
     * Delete the working directory
     *
     * @param string $directory
     */
    private function deleteWorkingDirectory($directory)
    {
        if ($this->filesystem->exists($directory)) {
            $this->filesystem->remove($directory);
        }
    }
}